/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include "utils.h"
#include <pulsar/ConsoleLoggerFactory.h>
#include "lib/Utils.h"
#include <memory>

template <typename T>
struct ListenerWrapper {
    PyObject* _pyListener;

    ListenerWrapper(py::object pyListener) : _pyListener(pyListener.ptr()) { Py_XINCREF(_pyListener); }

    ListenerWrapper(const ListenerWrapper& other) {
        _pyListener = other._pyListener;
        Py_XINCREF(_pyListener);
    }

    ListenerWrapper& operator=(const ListenerWrapper& other) {
        _pyListener = other._pyListener;
        Py_XINCREF(_pyListener);
        return *this;
    }

    virtual ~ListenerWrapper() { Py_XDECREF(_pyListener); }

    void operator()(T consumer, const Message& msg) {
        PyGILState_STATE state = PyGILState_Ensure();

        try {
            py::call<void>(_pyListener, py::object(&consumer), py::object(&msg));
        } catch (const py::error_already_set& e) {
            PyErr_Print();
        }

        PyGILState_Release(state);
    }
};

static ConsumerConfiguration& ConsumerConfiguration_setMessageListener(ConsumerConfiguration& conf,
                                                                       py::object pyListener) {
    conf.setMessageListener(ListenerWrapper<Consumer>(pyListener));
    return conf;
}

static ReaderConfiguration& ReaderConfiguration_setReaderListener(ReaderConfiguration& conf,
                                                                  py::object pyListener) {
    conf.setReaderListener(ListenerWrapper<Reader>(pyListener));
    return conf;
}

static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfiguration& conf,
                                                                  py::object authentication) {
    AuthenticationWrapper wrapper = py::extract<AuthenticationWrapper>(authentication);
    conf.setAuth(wrapper.auth);
    return conf;
}

static ConsumerConfiguration& ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf,
                                                                       py::object cryptoKeyReader) {
    CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
    conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
    return conf;
}

static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf,
                                                                       py::object cryptoKeyReader) {
    CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
    conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
    return conf;
}

static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfiguration& conf,
                                                                   py::object cryptoKeyReader) {
    CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
    conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
    return conf;
}

class LoggerWrapper : public Logger {
    PyObject* const _pyLogger;
    const int _pythonLogLevel;
    const std::unique_ptr<Logger> _fallbackLogger;

    static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); }

   public:
    LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
        : _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) {
        Py_XINCREF(_pyLogger);
    }

    LoggerWrapper(const LoggerWrapper&) = delete;
    LoggerWrapper(LoggerWrapper&&) noexcept = delete;
    LoggerWrapper& operator=(const LoggerWrapper&) = delete;
    LoggerWrapper& operator=(LoggerWrapper&&) = delete;

    virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }

    bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; }

    void log(Level level, int line, const std::string& message) {
        if (!Py_IsInitialized()) {
            // Python logger is unavailable - fallback to console logger
            _fallbackLogger->log(level, line, message);
        } else {
            PyGILState_STATE state = PyGILState_Ensure();

            try {
                switch (level) {
                    case Logger::LEVEL_DEBUG:
                        py::call_method<void>(_pyLogger, "debug", message.c_str());
                        break;
                    case Logger::LEVEL_INFO:
                        py::call_method<void>(_pyLogger, "info", message.c_str());
                        break;
                    case Logger::LEVEL_WARN:
                        py::call_method<void>(_pyLogger, "warning", message.c_str());
                        break;
                    case Logger::LEVEL_ERROR:
                        py::call_method<void>(_pyLogger, "error", message.c_str());
                        break;
                }

            } catch (const py::error_already_set& e) {
                _fallbackLogger->log(level, line, message);
            }

            PyGILState_Release(state);
        }
    }
};

class LoggerWrapperFactory : public LoggerFactory {
    std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
    PyObject* _pyLogger;
    Optional<int> _pythonLogLevel{Optional<int>::empty()};

    void initializePythonLogLevel() {
        PyGILState_STATE state = PyGILState_Ensure();

        try {
            int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
            _pythonLogLevel = Optional<int>::of(level);
        } catch (const py::error_already_set& e) {
            // Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
            _pythonLogLevel = Optional<int>::empty();
        }

        PyGILState_Release(state);
    }

   public:
    LoggerWrapperFactory(py::object pyLogger) {
        _pyLogger = pyLogger.ptr();
        Py_XINCREF(_pyLogger);
        initializePythonLogLevel();
    }

    virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }

    Logger* getLogger(const std::string& fileName) {
        const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
        if (_pythonLogLevel.is_present()) {
            return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
        } else {
            return fallbackLogger;
        }
    }
};

static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf, py::object logger) {
    conf.setLogger(new LoggerWrapperFactory(logger));
    return conf;
}

void export_config() {
    using namespace boost::python;

    class_<ClientConfiguration>("ClientConfiguration")
        .def("authentication", &ClientConfiguration_setAuthentication, return_self<>())
        .def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds)
        .def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
        .def("connection_timeout", &ClientConfiguration::getConnectionTimeout)
        .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>())
        .def("io_threads", &ClientConfiguration::getIOThreads)
        .def("io_threads", &ClientConfiguration::setIOThreads, return_self<>())
        .def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads)
        .def("message_listener_threads", &ClientConfiguration::setMessageListenerThreads, return_self<>())
        .def("concurrent_lookup_requests", &ClientConfiguration::getConcurrentLookupRequest)
        .def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>())
        .def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath,
             return_value_policy<copy_const_reference>())
        .def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>())
        .def("use_tls", &ClientConfiguration::isUseTls)
        .def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
        .def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath,
             return_value_policy<copy_const_reference>())
        .def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath, return_self<>())
        .def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection)
        .def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection,
             return_self<>())
        .def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>())
        .def("listener_name", &ClientConfiguration::setListenerName, return_self<>())
        .def("set_logger", &ClientConfiguration_setLogger, return_self<>());

    class_<ProducerConfiguration>("ProducerConfiguration")
        .def("producer_name", &ProducerConfiguration::getProducerName,
             return_value_policy<copy_const_reference>())
        .def("producer_name", &ProducerConfiguration::setProducerName, return_self<>())
        .def("schema", &ProducerConfiguration::getSchema, return_value_policy<copy_const_reference>())
        .def("schema", &ProducerConfiguration::setSchema, return_self<>())
        .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout)
        .def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_self<>())
        .def("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId)
        .def("initial_sequence_id", &ProducerConfiguration::setInitialSequenceId, return_self<>())
        .def("compression_type", &ProducerConfiguration::getCompressionType)
        .def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>())
        .def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages)
        .def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, return_self<>())
        .def("max_pending_messages_across_partitions",
             &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
        .def("max_pending_messages_across_partitions",
             &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>())
        .def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull)
        .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>())
        .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode)
        .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
        .def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers)
        .def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers,
             return_self<>())
        .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled,
             return_value_policy<copy_const_reference>())
        .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>())
        .def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages,
             return_value_policy<copy_const_reference>())
        .def("batching_max_messages", &ProducerConfiguration::setBatchingMaxMessages, return_self<>())
        .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes,
             return_value_policy<copy_const_reference>())
        .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes,
             return_self<>())
        .def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs,
             return_value_policy<copy_const_reference>())
        .def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs,
             return_self<>())
        .def("property", &ProducerConfiguration::setProperty, return_self<>())
        .def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>())
        .def("batching_type", &ProducerConfiguration::getBatchingType)
        .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>())
        .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>());

    class_<ConsumerConfiguration>("ConsumerConfiguration")
        .def("consumer_type", &ConsumerConfiguration::getConsumerType)
        .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>())
        .def("schema", &ConsumerConfiguration::getSchema, return_value_policy<copy_const_reference>())
        .def("schema", &ConsumerConfiguration::setSchema, return_self<>())
        .def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>())
        .def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize)
        .def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize)
        .def("max_total_receiver_queue_size_across_partitions",
             &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
        .def("max_total_receiver_queue_size_across_partitions",
             &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
        .def("consumer_name", &ConsumerConfiguration::getConsumerName,
             return_value_policy<copy_const_reference>())
        .def("consumer_name", &ConsumerConfiguration::setConsumerName)
        .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
        .def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
        .def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::getNegativeAckRedeliveryDelayMs)
        .def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::setNegativeAckRedeliveryDelayMs)
        .def("broker_consumer_stats_cache_time_ms",
             &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
        .def("broker_consumer_stats_cache_time_ms",
             &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
        .def("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod)
        .def("pattern_auto_discovery_period", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod)
        .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
        .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
        .def("property", &ConsumerConfiguration::setProperty, return_self<>())
        .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
        .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
        .def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>())
        .def("replicate_subscription_state_enabled",
             &ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
        .def("replicate_subscription_state_enabled",
             &ConsumerConfiguration::isReplicateSubscriptionStateEnabled);

    class_<ReaderConfiguration>("ReaderConfiguration")
        .def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>())
        .def("schema", &ReaderConfiguration::getSchema, return_value_policy<copy_const_reference>())
        .def("schema", &ReaderConfiguration::setSchema, return_self<>())
        .def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize)
        .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize)
        .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy<copy_const_reference>())
        .def("reader_name", &ReaderConfiguration::setReaderName)
        .def("subscription_role_prefix", &ReaderConfiguration::getSubscriptionRolePrefix,
             return_value_policy<copy_const_reference>())
        .def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
        .def("read_compacted", &ReaderConfiguration::isReadCompacted)
        .def("read_compacted", &ReaderConfiguration::setReadCompacted)
        .def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, return_self<>());
}
